Source code for hysop.topology.cartesian_descriptor

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import hashlib, copy
import numpy as np

from hysop.tools.htypes import check_instance, to_tuple
from hysop.topology.topology_descriptor import TopologyDescriptor
from hysop.topology.cartesian_topology import CartesianTopology
from hysop.tools.parameters import CartesianDiscretization
from hysop.constants import Backend, BoundaryCondition
from hysop.fields.continuous_field import Field
from hysop.tools.numpywrappers import npw


[docs] class CartesianTopologyDescriptor(TopologyDescriptor): """ Describes how a CartesianTopology topology should be built. """ __slots__ = ( "_mpi_params", "_domain", "_backend", "_extra_kwds", "_cartesian_discretization", "_space_step", ) def __init__(self, mpi_params, domain, backend, cartesian_discretization, **kwds): """ Initialize a CartesianTopologyDescriptor. Notes ----- kwds allows for backend specific variables. CartesianTopologyDescriptor is immutable. """ super().__init__(mpi_params=mpi_params, domain=domain, backend=backend, **kwds) check_instance(cartesian_discretization, CartesianDiscretization) # check cartesian_discretization if (cartesian_discretization.ghosts > 0).any(): msg = "No ghost allowed for a topology descriptor." raise ValueError(msg) global_resolution = cartesian_discretization.global_resolution grid_resolution = cartesian_discretization.grid_resolution lboundaries = cartesian_discretization.lboundaries rboundaries = cartesian_discretization.rboundaries check_instance(grid_resolution, np.ndarray, size=domain.dim, minval=2) check_instance(global_resolution, np.ndarray, size=domain.dim, minval=2) check_instance( lboundaries, npw.ndarray, dtype=object, size=domain.dim, values=BoundaryCondition, allow_none=True, ) check_instance( rboundaries, npw.ndarray, dtype=object, size=domain.dim, values=BoundaryCondition, allow_none=True, ) is_lperiodic = lboundaries == BoundaryCondition.PERIODIC is_rperiodic = rboundaries == BoundaryCondition.PERIODIC assert all((grid_resolution + is_lperiodic) == global_resolution) msg = "Invalid boundary conditions {} vs {}." msg = msg.format(lboundaries, rboundaries) assert not (is_lperiodic ^ is_rperiodic).any(), msg # compute space step space_step = npw.asrealarray(domain.length / (global_resolution - 1)) npw.set_readonly(space_step) self._cartesian_discretization = cartesian_discretization self._space_step = space_step @property def global_resolution(self): """Get the global global_resolution of the discretization (logical grid_size).""" return self._cartesian_discretization.global_resolution @property def grid_resolution(self): """Get the global grid resolution of the discretization (effective grid size).""" return self._cartesian_discretization.grid_resolution @property def lboundaries(self): """Get the left boundaries.""" return self._cartesian_discretization.lboundaries @property def rboundaries(self): """Get the left boundaries.""" return self._cartesian_discretization.rboundaries @property def boundaries(self): """Get left and right boundaries.""" return ( self._cartesian_discretization.lboundaries, self._cartesian_discretization.rboundaries, ) @property def space_step(self): """Get the space step.""" return self._space_step
[docs] def match(self, other, invert=False): """Test if this descriptor is equivalent to the other one.""" eq = super().match(other, invert=False) if (eq is NotImplemented) or ( not isinstance(other, CartesianTopologyDescriptor) ): return NotImplemented eq &= self._cartesian_discretization == other._cartesian_discretization if invert: return not eq else: return eq
def __eq__(self, other): return self.match(other) def __ne__(self, other): return self.match(other, invert=True) def __lt__(self, other): return (self != other) and (str(self) < str(other)) def __hash__(self): # hash(super(...)) does not work as expected so be call __hash__ directly h = super().__hash__() h ^= hash(self._cartesian_discretization) return h def __str__(self): return ":CartesianTopologyDescriptor: backend={}, domain={}, grid_resolution={}, bc=[{}]".format( self.backend, self.domain.full_tag, self.grid_resolution, ",".join( "{}/{}".format( str(lb).replace("HOMOGENEOUS_", "")[:3], str(rb).replace("HOMOGENEOUS_", "")[:3], ) for (lb, rb) in zip(*self.boundaries) ), )
[docs] @classmethod def build_descriptor(cls, backend, operator, field, handle, **kwds): from hysop.core.graph.computational_operator import ComputationalGraphOperator check_instance(backend, Backend) check_instance(operator, ComputationalGraphOperator) check_instance(field, Field) check_instance(handle, CartesianTopologyDescriptors) if isinstance(handle, (tuple, list, np.ndarray, CartesianDiscretization)): if not hasattr(operator, "mpi_params"): msg = f"mpi_params has not been set in operator {operator.name}." raise RuntimeError(msg) if isinstance(handle, CartesianDiscretization): if handle.ghosts.sum() > 0: msg = "A CartesianTopology topology descriptor should not contain any ghosts, " msg += "they will be determined during the get_field_requirements() in the " msg += " operator initialization step to minimize the number of topologies created." msg += "\nIf you want to impose a specific topology, you can directly pass a " msg += "CartesianTopology instance into operator's input or output variables " msg += "dictionnary instead." raise ValueError(msg) if (handle.lboundaries is not None) or (handle.rboundaries is not None): msg = "A CartesianTopology topology descriptor should not contain any boundary conditions, " msg += ( "they will be automatically determined from continuous fields." ) raise ValueError(msg) global_resolution = handle.resolution else: global_resolution = handle cartesian_discretization = CartesianDiscretization( resolution=global_resolution, lboundaries=field.lboundaries_kind, rboundaries=field.rboundaries_kind, ghosts=None, ) kwds.setdefault("mpi_params", operator.mpi_params) kwds.setdefault("domain", field.domain) return CartesianTopologyDescriptor( backend=backend, cartesian_discretization=cartesian_discretization, **kwds, ) elif isinstance(handle, CartesianTopologyDescriptor): return handle else: # handle is a CartesianTopology instance, ghosts and boundary conditions # can be imposed freely by user here. return handle
[docs] def choose_topology(self, known_topologies, **kwds): """ Find optimal topology parameters from known_topologies. If None is returned, create_topology will be called instead. """ if known_topologies: ordered_topologies = tuple( sorted(known_topologies, key=lambda topo: sum(topo.ghosts)) ) return ordered_topologies[0] else: return None
[docs] def create_topology(self, cutdirs, ghosts): """ Build a topology with the current TopologyDescriptor. Free parameters are cutdir and ghosts which are imposed by operators on variables and solved during operator's method get_field_requirements(). """ discretization = CartesianDiscretization( resolution=self.grid_resolution, lboundaries=self.lboundaries, rboundaries=self.rboundaries, ghosts=ghosts, ) return CartesianTopology( domain=self.domain, discretization=discretization, mpi_params=self.mpi_params, cutdirs=cutdirs, backend=self.backend, **self.extra_kwds, )
CartesianTopologyDescriptors = ( CartesianTopology, CartesianTopologyDescriptor, CartesianDiscretization, tuple, list, np.ndarray, type(None), ) """ Instance of those types can be used to create a CartesianTopologyDescriptor. Thus they can be passed in the variables of each operator supporting CartesianTopology topologies. """
[docs] def get_topo_descriptor_discretization(td): """ Get grid resolution from any type of CartesianTopologyDescriptor. """ check_instance(td, CartesianTopologyDescriptors, allow_none=True) if td is None: return None elif isinstance(td, CartesianTopology): td = td.grid_resolution elif isinstance(td, CartesianTopologyDescriptor): td = td.grid_resolution elif isinstance(td, CartesianDiscretization): td = td.grid_resolution return to_tuple(td)